/*
 * Copyright 2021 Shulie Technology, Co.Ltd
 * Email: shulie@shulie.io
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.shulie.surge.data.runtime.disruptor.dsl;

import io.shulie.surge.data.runtime.disruptor.*;
import io.shulie.surge.data.runtime.disruptor.util.Util;

import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * A DSL-style API for setting up the disruptor pattern around a ring buffer (aka the Builder pattern).
 *
 * <p>A simple example of setting up the disruptor with two event handlers that must process events in order:</p>
 *
 * <pre><code> Disruptor&lt;MyEvent&gt; disruptor = new Disruptor&lt;MyEvent&gt;(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
 * EventHandler&lt;MyEvent&gt; handler1 = new EventHandler&lt;MyEvent&gt;() { ... };
 * EventHandler&lt;MyEvent&gt; handler2 = new EventHandler&lt;MyEvent&gt;() { ... };
 * disruptor.handleEventsWith(handler1);
 * disruptor.after(handler1).handleEventsWith(handler2);
 *
 * RingBuffer ringBuffer = disruptor.start();</code></pre>
 *
 * @param <T> the type of event used.
 */
public class Disruptor<T>
{
	private final RingBuffer<T> ringBuffer;
	private final Executor executor;
	private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
	private final AtomicBoolean started = new AtomicBoolean(false);
	private ExceptionHandler exceptionHandler;

	/**
	 * Create a new Disruptor.
	 *
	 * @param eventFactory   the factory to create events in the ring buffer.
	 * @param ringBufferSize the size of the ring buffer.
	 * @param executor       an {@link Executor} to execute event processors.
	 */
	public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor)
	{
		this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
	}

	/**
	 * Create a new Disruptor.
	 *
	 * @param eventFactory   the factory to create events in the ring buffer.
	 * @param executor       an {@link Executor} to execute event processors.
	 * @param producerType   the claim strategy to use for the ring buffer.
	 * @param waitStrategy   the wait strategy to use for the ring buffer.
	 */
	public Disruptor(final EventFactory<T> eventFactory,
			final int ringBufferSize,
			final Executor executor,
			final ProducerType producerType,
			final WaitStrategy waitStrategy)
	{
		this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
				executor);
	}

	/**
	 * Private constructor helper
	 */
	private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
	{
		this.ringBuffer = ringBuffer;
		this.executor = executor;
	}

	/**
	 * <p>Set up event handlers to handle events from the ring buffer. These handlers will process events
	 * as soon as they become available, in parallel.</p>
	 *
	 * <p>This method can be used as the start of a chain. For example if the handler <code>A</code> must
	 * process events before handler <code>B</code>:</p>
	 * <pre><code>dw.handleEventsWith(A).then(B);</code></pre>
	 *
	 * @param handlers the event handlers that will process events.
	 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
	 */
	@SuppressWarnings("varargs")
	public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
	{
		return createEventProcessors(new Sequence[0], handlers);
	}

	/**
	 * Set up custom event processors to handle events from the ring buffer. The Disruptor will
	 * automatically start this processors when {@link #start()} is called.
	 *
	 * @param processors the event processors that will process events.
	 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
	 */
	public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors)
	{
		for (EventProcessor processor : processors)
		{
			consumerRepository.add(processor);
		}
		return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
	}

	/**
	 * Set up a {@link WorkerPool} to distribute an event to one of a pool of work handler threads.
	 * Each event will only be processed by one of the work handlers.
	 * The Disruptor will automatically start this processors when {@link #start()} is called.
	 *
	 * @param workHandlers the work handlers that will process events.
	 * @return a {@link EventHandlerGroup} that can be used to chain dependencies.
	 */
	@SuppressWarnings("varargs")
	public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
	{
		return createWorkerPool(new Sequence[0], workHandlers);
	}

	/**
	 * <p>Specify an exception handler to be used for any future event handlers.</p>
	 *
	 * <p>Note that only event handlers set up after calling this method will use the exception handler.</p>
	 *
	 * @param exceptionHandler the exception handler to use for any future {@link EventProcessor}.
	 */
	public void handleExceptionsWith(final ExceptionHandler exceptionHandler)
	{
		this.exceptionHandler = exceptionHandler;
	}

	/**
	 * Override the default exception handler for a specific handler.
	 * <pre>disruptorWizard.handleExceptionsIn(eventHandler).with(exceptionHandler);</pre>
	 *
	 * @param eventHandler the event handler to set a different exception handler for.
	 * @return an ExceptionHandlerSetting dsl object - intended to be used by chaining the with method call.
	 */
	public ExceptionHandlerSetting<?> handleExceptionsFor(final EventHandler<T> eventHandler)
	{
		return new ExceptionHandlerSetting<T>(eventHandler, consumerRepository);
	}

	/**
	 * <p>Create a group of event handlers to be used as a dependency.
	 * For example if the handler <code>A</code> must process events before handler <code>B</code>:</p>
	 *
	 * <pre><code>dw.after(A).handleEventsWith(B);</code></pre>
	 *
	 * @param handlers the event handlers, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventHandler[])},
	 *                 that will form the barrier for subsequent handlers or processors.
	 * @return an {@link EventHandlerGroup} that can be used to setup a dependency barrier over the specified event handlers.
	 */
	@SuppressWarnings("varargs")
	public EventHandlerGroup<T> after(final EventHandler<T>... handlers)
	{
		Sequence[] sequences = new Sequence[handlers.length];
		for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++)
		{
			sequences[i] = consumerRepository.getSequenceFor(handlers[i]);
		}

		return new EventHandlerGroup<T>(this, consumerRepository, sequences);
	}

	/**
	 * Create a group of event processors to be used as a dependency.
	 *
	 * @param processors the event processors, previously set up with {@link #handleEventsWith(com.lmax.disruptor.EventProcessor...)},
	 *                   that will form the barrier for subsequent handlers or processors.
	 * @return an {@link EventHandlerGroup} that can be used to setup a {@link SequenceBarrier} over the specified event processors.
	 * @see #after(com.lmax.disruptor.EventHandler[])
	 */
	public EventHandlerGroup<T> after(final EventProcessor... processors)
	{
		for (EventProcessor processor : processors)
		{
			consumerRepository.add(processor);
		}

		return new EventHandlerGroup<T>(this, consumerRepository, Util.getSequencesFor(processors));
	}

	/**
	 * Publish an event to the ring buffer.
	 *
	 * @param eventTranslator the translator that will load data into the event.
	 */
	public void publishEvent(final EventTranslator<T> eventTranslator)
	{
		ringBuffer.publishEvent(eventTranslator);
	}

	/**
	 * Publish an event to the ring buffer.
	 *
	 * @param eventTranslator the translator that will load data into the event.
	 * @param arg A single argument to load into the event
	 */
	public <A> void publishEvent(final EventTranslatorOneArg<T, A> eventTranslator, A arg)
	{
		ringBuffer.publishEvent(eventTranslator, arg);
	}

	/**
	 * Publish a batch of events to the ring buffer.
	 *
	 * @param eventTranslator the translator that will load data into the event.
	 * @param arg An array single arguments to load into the events. One Per event.
	 */
	public <A> void publishEvents(final EventTranslatorOneArg<T, A> eventTranslator, A[] arg)
	{
		ringBuffer.publishEvents(eventTranslator, arg);
	}

	/**
	 * <p>Starts the event processors and returns the fully configured ring buffer.</p>
	 *
	 * <p>The ring buffer is set up to prevent overwriting any entry that is yet to
	 * be processed by the slowest event processor.</p>
	 *
	 * <p>This method must only be called once after all event processors have been added.</p>
	 *
	 * @return the configured ring buffer.
	 */
	public RingBuffer<T> start()
	{
		Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
		ringBuffer.addGatingSequences(gatingSequences);

		checkOnlyStartedOnce();
		for (ConsumerInfo consumerInfo : consumerRepository)
		{
			consumerInfo.start(executor);
		}

		return ringBuffer;
	}

	/**
	 */
	public void halt()
	{
		for (ConsumerInfo consumerInfo : consumerRepository)
		{
			consumerInfo.halt();
		}
	}

	/**
	 * Waits until all events currently in the disruptor have been processed by all event processors
	 * and then halts the processors.  It is critical that publishing to the ring buffer has stopped
	 * before calling this method, otherwise it may never return.
	 *
	 * <p>This method will not shutdown the executor, nor will it await the final termination of the
	 * processor threads.</p>
	 */
	public void shutdown()
	{
		try
		{
			shutdown(-1, TimeUnit.MILLISECONDS);
		} catch (TimeoutException e)
		{
			exceptionHandler.handleOnShutdownException(e);
		}
	}

	/**
	 * <p>Waits until all events currently in the disruptor have been processed by all event processors
	 * and then halts the processors.</p>
	 *
	 * <p>This method will not shutdown the executor, nor will it await the final termination of the
	 * processor threads.</p>
	 *
	 * @param timeout  the amount of time to wait for all events to be processed. <code>-1</code> will give an infinite timeout
	 * @param timeUnit the unit the timeOut is specified in
	 */
	public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
	{
		long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
		while (hasBacklog())
		{
			if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
			{
				throw TimeoutException.INSTANCE;
			}
			// Busy spin
		}
		halt();
	}

	/**
	 * The {@link RingBuffer} used by this Disruptor.  This is useful for creating custom
	 * event processors if the behaviour of {@link BatchEventProcessor} is not suitable.
	 *
	 * @return the ring buffer used by this Disruptor.
	 */
	public RingBuffer<T> getRingBuffer()
	{
		return ringBuffer;
	}

	/**
	 * Get the value of the cursor indicating the published sequence.
	 *
	 * @return value of the cursor for events that have been published.
	 */
	public long getCursor()
	{
		return ringBuffer.getCursor();
	}

	/**
	 * The capacity of the data structure to hold entries.
	 *
	 * @return the size of the RingBuffer.
	 * @see com.lmax.disruptor.Sequencer#getBufferSize()
	 */
	public long getBufferSize()
	{
		return ringBuffer.getBufferSize();
	}

	/**
	 * Get the event for a given sequence in the RingBuffer.
	 *
	 * @param sequence for the event.
	 * @return event for the sequence.
	 * @see RingBuffer#get(long)
	 */
	public T get(final long sequence)
	{
		return ringBuffer.get(sequence);
	}

	/**
	 * Get the {@link SequenceBarrier} used by a specific handler. Note that the {@link SequenceBarrier}
	 * may be shared by multiple event handlers.
	 *
	 * @param handler the handler to get the barrier for.
	 * @return the SequenceBarrier used by <i>handler</i>.
	 */
	public SequenceBarrier getBarrierFor(final EventHandler<T> handler)
	{
		return consumerRepository.getBarrierFor(handler);
	}

	/**
	 * Confirms if all messages have been consumed by all event processors
	 */
	private boolean hasBacklog()
	{
		final long cursor = ringBuffer.getCursor();
		for (Sequence consumer : consumerRepository.getLastSequenceInChain(false))
		{
			if (cursor > consumer.get())
			{
				return true;
			}
		}
		return false;
	}

	EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
			final EventHandler<T>[] eventHandlers)
	{
		checkNotStarted();

		final Sequence[] processorSequences = new Sequence[eventHandlers.length];
		final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

		for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)
		{
			final EventHandler<T> eventHandler = eventHandlers[i];

			final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier,
					eventHandler);

			if (exceptionHandler != null)
			{
				batchEventProcessor.setExceptionHandler(exceptionHandler);
			}

			consumerRepository.add(batchEventProcessor, eventHandler, barrier);
			processorSequences[i] = batchEventProcessor.getSequence();
		}

		if (processorSequences.length > 0)
		{
			consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
		}

		return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
	}

	EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<T>[] workHandlers)
	{
		final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
		final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
		consumerRepository.add(workerPool, sequenceBarrier);
		return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
	}

	private void checkNotStarted()
	{
		if (started.get())
		{
			throw new IllegalStateException("All event handlers must be added before calling starts.");
		}
	}

	private void checkOnlyStartedOnce()
	{
		if (!started.compareAndSet(false, true))
		{
			throw new IllegalStateException("Disruptor.start() must only be called once.");
		}
	}
}
